package com.xj.kafka.consumer;

import com.xj.kafka.thread.MessageDisposePoolFactory;
import com.xj.kafka.thread.MessageDisposeThread;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * User: bjxiajun
 * Date: 13-10-17
 * Time: 下午3:37
 */
public class BaseConsumer {
    private ConsumerConfig consumerConfig;
    private int threadNum;//每一个stream使用的线程数

    /**
     * 构造函数
     * @param consumerConfig consumer参数信息
     * @param threadNum 每个主题stream使用的线程数
     */
    public BaseConsumer(ConsumerConfig consumerConfig, int threadNum) {
        if (consumerConfig == null) {
            throw new NullPointerException("ConsumerConfig is null.");
        }
        if (threadNum <= 0) {
            throw new IllegalArgumentException("Processing threads cannot be less than 1.");
        }
        this.consumerConfig = consumerConfig;
        this.threadNum = threadNum;
    }

    /**
     * 启动接收消息线程
     *
     * @param topicMap       主题map
     * @param messageDispose 消息解析类
     */
    public void start(Map<String, Integer> topicMap, IMessageDispose messageDispose) {
        if (topicMap == null) {
            throw new NullPointerException("Topics may not be less than 1.");
        }
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(topicMap);
        for (Map.Entry key : topicMap.entrySet()) {
            List<KafkaStream<Message>> streams = topicMessageStreams.get(key.getKey());
            ExecutorService executor = Executors.newFixedThreadPool(threadNum, new MessageDisposePoolFactory());//使用守护线程
            for (final KafkaStream<Message> stream : streams) {
                executor.submit(new MessageDisposeThread(stream, messageDispose));
            }
        }
    }
}
